package com.dec.kks.etl.model;

import org.apache.log4j.LogManager;
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class SchedualModeInSteaminglMain {

    public static void main(String[] args) throws Exception {
        LogManager.resetConfiguration();
        PropertyConfigurator.configure("/home/hdfs/soft/dec-project/dec-kks-etl/src/main/resources/log4j.properties");
//        Logger.getLogger("org").setLevel(Level.ERROR);
        System.setProperty("hadoop.home.dir", "/home/hdfs/bigdata/hadoop-2.7.4");

        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("schedual model in stream");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

        JavaSparkContext spark = jssc.sparkContext();
        SQLContext sqlContext = new SQLContext(spark);
        PipelineModel model = PipelineModel.load("model/regress/dct");

        JavaDStream<Coil> coil = lines.map(new Function<String, Coil>() {
            @Override
            public Coil call(String s) throws Exception {
                String[] sample = s.split(",");
                Coil coil1 = new Coil();
                coil1.setEcurrent(Double.valueOf(sample[0]));
                coil1.setFlow(Integer.valueOf(sample[1]));
                coil1.setRise(Double.valueOf(sample[2]));
                return coil1;
            }
        });

        coil.foreachRDD(new VoidFunction<JavaRDD<Coil>>() {
            @Override
            public void call(JavaRDD<Coil> coilRDD) throws Exception {
                Dataset<Row> peopleDataFrame = sqlContext.createDataFrame(coilRDD, Coil.class);
                peopleDataFrame.show();
                Dataset<Row> res = model.transform(peopleDataFrame);
                System.out.println("********************");
                if (res.count() != 0) {
                    res.show();
                    res.drop("features").drop("label").write().format("csv").save("data/test.csv");
                }
                System.out.println("********************");
            }
        });

        jssc.start();
        jssc.awaitTermination();
    }
}
